Skip to main content
Glama
javiarmesto

Business Central MCP Server

by javiarmesto
MCP SDK Python Simple StreamableHttp Server.py5.96 kB
import contextlib import logging from collections.abc import AsyncIterator import anyio import click import mcp.types as types from mcp.server.lowlevel import Server from mcp.server.streamable_http_manager import StreamableHTTPSessionManager from pydantic import AnyUrl from starlette.applications import Starlette from starlette.routing import Mount from starlette.types import Receive, Scope, Send from .event_store import InMemoryEventStore # Configure logging logger = logging.getLogger(__name__) @click.command() @click.option("--port", default=3000, help="Port to listen on for HTTP") @click.option( "--log-level", default="INFO", help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)", ) @click.option( "--json-response", is_flag=True, default=False, help="Enable JSON responses instead of SSE streams", ) def main( port: int, log_level: str, json_response: bool, ) -> int: # Configure logging logging.basicConfig( level=getattr(logging, log_level.upper()), format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) app = Server("mcp-streamable-http-demo") @app.call_tool() async def call_tool(name: str, arguments: dict) -> list[types.ContentBlock]: ctx = app.request_context interval = arguments.get("interval", 1.0) count = arguments.get("count", 5) caller = arguments.get("caller", "unknown") # Send the specified number of notifications with the given interval for i in range(count): # Include more detailed message for resumability demonstration notification_msg = ( f"[{i + 1}/{count}] Event from '{caller}' - " f"Use Last-Event-ID to resume if disconnected" ) await ctx.session.send_log_message( level="info", data=notification_msg, logger="notification_stream", # Associates this notification with the original request # Ensures notifications are sent to the correct response stream # Without this, notifications will either go to: # - a standalone SSE stream (if GET request is supported) # - nowhere (if GET request isn't supported) related_request_id=ctx.request_id, ) logger.debug(f"Sent notification {i + 1}/{count} for caller: {caller}") if i < count - 1: # Don't wait after the last notification await anyio.sleep(interval) # This will send a resource notificaiton though standalone SSE # established by GET request await ctx.session.send_resource_updated(uri=AnyUrl("http:///test_resource")) return [ types.TextContent( type="text", text=( f"Sent {count} notifications with {interval}s interval" f" for caller: {caller}" ), ) ] @app.list_tools() async def list_tools() -> list[types.Tool]: return [ types.Tool( name="start-notification-stream", description=( "Sends a stream of notifications with configurable count" " and interval" ), inputSchema={ "type": "object", "required": ["interval", "count", "caller"], "properties": { "interval": { "type": "number", "description": "Interval between notifications in seconds", }, "count": { "type": "number", "description": "Number of notifications to send", }, "caller": { "type": "string", "description": ( "Identifier of the caller to include in notifications" ), }, }, }, ) ] # Create event store for resumability # The InMemoryEventStore enables resumability support for StreamableHTTP transport. # It stores SSE events with unique IDs, allowing clients to: # 1. Receive event IDs for each SSE message # 2. Resume streams by sending Last-Event-ID in GET requests # 3. Replay missed events after reconnection # Note: This in-memory implementation is for demonstration ONLY. # For production, use a persistent storage solution. event_store = InMemoryEventStore() # Create the session manager with our app and event store session_manager = StreamableHTTPSessionManager( app=app, event_store=event_store, # Enable resumability json_response=json_response, ) # ASGI handler for streamable HTTP connections async def handle_streamable_http( scope: Scope, receive: Receive, send: Send ) -> None: await session_manager.handle_request(scope, receive, send) @contextlib.asynccontextmanager async def lifespan(app: Starlette) -> AsyncIterator[None]: """Context manager for managing session manager lifecycle.""" async with session_manager.run(): logger.info("Application started with StreamableHTTP session manager!") try: yield finally: logger.info("Application shutting down...") # Create an ASGI application using the transport starlette_app = Starlette( debug=True, routes=[ Mount("/mcp", app=handle_streamable_http), ], lifespan=lifespan, ) import uvicorn uvicorn.run(starlette_app, host="127.0.0.1", port=port) return 0

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/javiarmesto/Lab3_1_MCP_BusinessCentral'

If you have feedback or need assistance with the MCP directory API, please join our Discord server